多线程事务如何保证效率和原子性

您所在的位置:网站首页 jpa多对多中间表 操作 多线程事务如何保证效率和原子性

多线程事务如何保证效率和原子性

2023-06-07 06:57| 来源: 网络整理| 查看: 265

开启掘金成长之旅!这是我参与「掘金日新计划 · 2 月更文挑战」的第 1 天,点击查看活动详情

多线程事务

在Spring开发时,遇到一个从Excel表导入数据到数据库的需求,当然最简单的方法就是先使用EasyExcel把数据读出到集合中,然后依次插入到数据库中。

但如何保证效率,原子性呢?我们一步步优化方案。这里不会引入不必要的组件,而是自己模拟类似的思想。

方法1:依次顺序插入

void test() { ? ? ? ?List users = getAllUsers(); ? ? ? ?users.forEach(user -> userService.save(user)); ? }

方法2:使用批处理,一次操作中执行多条SQL

void test() { ? ?List users = getAllUsers(); ? ?userService.saveBatch(users); }

方法3:使用多线程+批处理,每个线程插入多条数据

需要注意的一点,Spring容器不允许线程注入,也就是没办法在多线程直接使用Bean操作,例如:

void testThread() { // 下面两种方式是无效的,不会执行任何东西 Runnable runnable = () -> { ? ? ? ? userService.save(new User()); ? ? }; // 方法1 new Thread(runnable).start(); // 方法2 Executors.newFixedThreadPool(1).submit(runnable); }

我们需要下面的方式进行执行

void testThread() { ? ? Runnable runnable = () -> { ? ? ? ? userService.save(new User()); ? ? }; ? ? ExecutorService executorService = Executors.newFixedThreadPool(1); ? ? CompletableFuture future = CompletableFuture.runAsync(runnable, executorService); ? ? future.join(); } ? void testThread() { ? ?int threadSize = 5; ? ?ExecutorService executorService = Executors.newFixedThreadPool(threadSize); ? ?List list = new ArrayList(); ? ?for (int i = 0; i < threadSize; i++) { ? ? ? ?// 我们假设数据拆分为五分 ? ? ? ?list.add(getAllUsers()); ? } ? ?for (List users : list) { ? ? ? ?CompletableFuture.runAsync(()->{ ? ? ? ? ? ?userService.saveBatch(users); ? ? ? },executorService).join(); ? } ? ?System.out.println("插入成功"); }

方法4:这时候速度已经很快了,但是如果其中一个线程插入数据时发生错误进行回滚,其他线程是无法得知的,因为事务是针对线程的,所以这里我们需要用一些方式保证每个线程之间的状态是被共享的。

// UserService#saveUserSyn() @Override public boolean saveUserSyn(List users, CountDownLatch threadLatch, CountDownLatch mainLatch, UserError hasError) { ? ? ? ?TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); ? ? ? ?System.out.println("子线程:" + Thread.currentThread().getName()); ? ? ? ?try { ? ? ? ? ? ?users.forEach(this::save); ? ? ? } catch (Throwable e) { ? ? ? ? ? ?hasError.setHasError(true); ? ? ? } finally { ? ? ? ? ? ?threadLatch.countDown(); // 切换到主线程执行 ? ? ? } ? ? ? ?try { ? ? ? ? ? ?mainLatch.await(); ?//等待主线程执行 ? ? ? } catch (Throwable e) { ? ? ? ? ? ?hasError.setHasError(true); ? ? ? } ? ? ? ?// 判断是否有错误,如有错误 就回滚事务 ? ? ? ?if (hasError.isHasError()) { ? ? ? ? ? ?dataSourceTransactionManager.rollback(transactionStatus); ? ? ? } else { ? ? ? ? ? ?dataSourceTransactionManager.commit(transactionStatus); ? ? ? } ? ? ? ?return true; ? } // 测试方法 @Test ? ?void userSaveSyn() { ? ? ? ? ?List userList = getAllUsers(); ? ? ? ? ?// 添加一个错误数据 ? ? ? ?User user = new User(); ? ? ? ?user.setUserAccount(null); ? ? ? ?user.setUserPassword("123456"); ? ? ? ?userList.add(user); ? ? ? ? ?// 线程数量 ? ? ? ?final Integer threadCount = 4; ? ? ? ? ?//每个线程处理的数据量 ? ? ? ?final Integer dataPartionLength = (userList.size() + threadCount - 1) / threadCount; ? ? ? ? ?// 创建多线程处理任务 ? ? ? ?ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount); ? ? ? ?CountDownLatch threadLatchs = new CountDownLatch(threadCount); // 用于计算子线程提交数量 ? ? ? ?CountDownLatch mainLatch = new CountDownLatch(1); // 用于判断主线程是否提交 ? ? ? ? ?for (int i = 0; i < threadCount; i++) { ? ? ? ? ? ?// 每个线程处理的数据 ? ? ? ? ? ?List threadDatas = userList.stream() ? ? ? ? ? ? ? ? ? .skip(i * dataPartionLength).limit(dataPartionLength) ? ? ? ? ? ? ? ? ? .collect(Collectors.toList()); ? ? ? ? ? ?studentThreadPool.execute(() -> { ? ? ? ? ? ? ? ?userService.saveUserSyn(threadDatas, threadLatchs, mainLatch, hasError); ? ? ? ? ? }); ? ? ? } ? ? ? ? ?try { ? ? ? ? ? ?// 倒计时锁设置超时时间 30s ? ? ? ? ? ?boolean await = threadLatchs.await(30, TimeUnit.SECONDS); ? ? ? ? ? ?if (!await) { // 等待超时,事务回滚 ? ? ? ? ? ? ? ?hasError.setHasError(true); ? ? ? ? ? } ? ? ? } catch (Throwable e) { ? ? ? ? ? ?e.printStackTrace(); ? ? ? ? ? ?hasError.setHasError(true); ? ? ? } ? ? ? ?mainLatch.countDown(); // 切换到子线程执行 ? ? ? ?studentThreadPool.shutdown(); //关闭线程池 ? ? ? ?System.out.println("主线程完成"); ? }

这里我们使用CountDownLatch 和 Volatile来解决这个问题。

CountDownLatch的语法与原理讲解

Volatile保证线程间数据的可见性

2PC(两阶段提交),这个属于分布式事务的一个理论,这里模拟了这样的业务场景,大致流程为:

每个线程开启事务,插入数据,但不提交,向主线程通知说,我这里已经好了主线程等待一段时间,看是否所有的子线程都没问题了。如果超时也算是异常如果没有异常,主线程向所有子线程通知,可以提交事务如果有异常,主线程向所有子线程通知,进行回滚操作而中间使用Volatile修饰的hasError对象进行传达,是否出现异常。需要注意如果只是传递普通的boolean对象,可能会发生不一致的情况,我测试时没法通过。CountDownLatch则保证子线程在主线程没有通知前,是不能提交事务的。

这里细心些就会发现,即便是主线程通知子线程可以提交了,子线程依然有可能出现提交失败的可能,那其他线程提交事务是无法得知这边的失败的消息的。这就是我们其实无法在一个Java进程中保证多线程的原子性。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3